package com.zwcl.common.component.thread;


import com.zwcl.common.core.exception.BaseException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;


/**
 * 用于处理异步请求
 *
 * @param <Task>
 * @param <Result>
 */
@Slf4j
@Data
public class AsyncBox<Task extends AbstractSimpleTask<Result>, Result> {

    // 用于存放加入的任务
//    private final List<Task> taskBox;

    private final Map<Thread, List<Task>> taskBox;

    // 用于接受处理的结果
    private final Map<Task, Future<Result>> resultBox;

    // 用于执行任务的线程池
    private final ExecutorService executorService;

    // 默认任务数=16,这里就使用HashMap和ArrayList的默认
    private final static int defaultTaskCount = 2 << 3;

//    // 该实例任务是否执行
//    private volatile boolean isExecuted;

    // 保存最终结果Data
    private List<Result> results;

    public AsyncBox(ExecutorService executorService) {
        Assert.notNull(executorService, () -> "传入线程池实例不能为空");
        this.executorService = executorService;
        // 如果任务很多这里就需要一直扩容，性能不是很高，
        // 所以使用LinkedList,使用LinkedList遍历速度也是可以
        this.taskBox = new ConcurrentHashMap<>(defaultTaskCount);
        this.resultBox = new ConcurrentHashMap<>(defaultTaskCount);
    }

    /**
     * 添加单个任务
     */
    private boolean add(Task task) {
        Thread t = Thread.currentThread();
        if (this.taskBox.containsKey(t)) {
            List<Task> tasks = this.taskBox.get(t);
            tasks.add(task);
            return true;
        }
        List<Task> tasks = new LinkedList<>();
        tasks.add(task);
        this.taskBox.put(t, tasks);
        return true;
    }


    /**
     * 往taskBox中添加任务
     */
    public AsyncBox<Task, Result> addTask(Task task) {
        this.add(task);
        return this;
    }

    /**
     * 批量添加任务
     */
    @SafeVarargs
    public final AsyncBox<Task, Result> addTask(Task... tasks) {
        for (Task task : tasks) {
            this.add(task);
        }
        return this;
    }

    /**
     * 将所有的异步请求放入加入线程池
     */
    public AsyncBox<Task, Result> execute() {
        Thread t = Thread.currentThread();
        List<Task> tasks = taskBox.getOrDefault(t, Collections.emptyList());
        for (Task task : tasks) {
            if (!task.isExecute()) {
                log.info("{}", tasks.size());
                log.info("{}加入线程池", task.name());
                // 设置任务状态=已执行
                task.setExecuted();
                resultBox.put(task, executorService.submit(task));
            }
        }

        return this;
    }


    /**
     * 根据任务名称获取任务对应的结果
     */
    public Result get(Task task) {

        // 检查该任务是否执行
        if (!task.isExecute()) {
            throw new BaseException("该任务尚未执行:" + task.name());
        }
        // 检查是否包含此任务的结果
        if (CollectionUtils.isEmpty(resultBox)
                || !resultBox.containsKey(task)) {
            log.error("任务结果不包含:{}", task.name());
            throw new BaseException("任务结果不包含:" + task.name());
        }
        try {
            return resultBox.get(task).get();
        } catch (ExecutionException
                | InterruptedException ex) {
            log.error("获取任务失败", ex);
//            throw new BizException("获取任务结果失败,任务名称:" + task.name());
            return null;
        } finally {
            // 获取到结果后，移除掉resultBox中的任务
            resultBox.remove(task);
        }
    }


    @SafeVarargs
    public final AsyncBox<Task, Result> merge(Task... tasks) {
        List<Result> results = new ArrayList<>(tasks.length);
        for (Task task : tasks) {
            Result result = this.get(task);
            if (result == null) continue;
            if (result instanceof Collection) {
                Collection<Result> t1 = (Collection<Result>) result;
                results.addAll(t1);
            } else {
                results.add(result);
            }
        }
        // 合并结果的时候，可以移除掉taskBox的中的任务
        taskBox.remove(Thread.currentThread());
        this.results = results;
        return this;
    }


    public AsyncBox<Task, Result> merge() {
//        if (!this.isExecuted) this.execute();
        if (CollectionUtils.isEmpty(resultBox)) {
            log.error("任务结果不包含");
            throw new BaseException("AsyncBox result结果集为空");
        }
        List<Result> results = new ArrayList<>();
        for (Map.Entry<Task, Future<Result>> entry : resultBox.entrySet()) {
            Result result = this.get(entry.getKey());
            if (result == null) continue;
            if (result instanceof Collection) {
                Collection<Result> t1 = (Collection<Result>) result;
                results.addAll(t1);
            } else {
                results.add(result);
            }
        }
        // 合并结果的时候，可以移除掉taskBox的中的任务
        taskBox.remove(Thread.currentThread());
        this.results = results;
        return this;
    }

    /**
     * 关闭线程池，如果想直接关闭不再进行后续任务的话
     */
    public AsyncBox<Task, Result> shutdown() {
        if (executorService != null
                && !executorService.isShutdown()
                && !executorService.isTerminated()) {
            this.executorService.shutdown();
        }
        return this;
    }

    public List<Result> getData() {
        return this.results;
    }

}
